home *** CD-ROM | disk | FTP | other *** search
- #
- # randpool.py : Cryptographically strong random number generation
- #
- # Part of the Python Cryptography Toolkit
- #
- # Distribute and use freely; there are no restrictions on further
- # dissemination and usage except those imposed by the laws of your
- # country of residence. This software is provided "as is" without
- # warranty of fitness for use or suitability for any purpose, express
- # or implied. Use at your own risk or not at all.
- #
-
- __revision__ = "$Id: randpool.py,v 1.14 2004/05/06 12:56:54 akuchling Exp $"
-
- import time, array, types, warnings, os.path
- from Crypto.Util.number import long_to_bytes
- try:
- import Crypto.Util.winrandom as winrandom
- except:
- winrandom = None
-
- STIRNUM = 3
-
- class RandomPool:
- """randpool.py : Cryptographically strong random number generation.
-
- The implementation here is similar to the one in PGP. To be
- cryptographically strong, it must be difficult to determine the RNG's
- output, whether in the future or the past. This is done by using
- a cryptographic hash function to "stir" the random data.
-
- Entropy is gathered in the same fashion as PGP; the highest-resolution
- clock around is read and the data is added to the random number pool.
- A conservative estimate of the entropy is then kept.
-
- If a cryptographically secure random source is available (/dev/urandom
- on many Unixes, Windows CryptGenRandom on most Windows), then use
- it.
-
- Instance Attributes:
- bits : int
- Maximum size of pool in bits
- bytes : int
- Maximum size of pool in bytes
- entropy : int
- Number of bits of entropy in this pool.
-
- Methods:
- add_event([s]) : add some entropy to the pool
- get_bytes(int) : get N bytes of random data
- randomize([N]) : get N bytes of randomness from external source
- """
-
-
- def __init__(self, numbytes = 160, cipher=None, hash=None):
- if hash is None:
- from Crypto.Hash import SHA as hash
-
- # The cipher argument is vestigial; it was removed from
- # version 1.1 so RandomPool would work even in the limited
- # exportable subset of the code
- if cipher is not None:
- warnings.warn("'cipher' parameter is no longer used")
-
- if isinstance(hash, types.StringType):
- # ugly hack to force __import__ to give us the end-path module
- hash = __import__('Crypto.Hash.'+hash,
- None, None, ['new'])
- warnings.warn("'hash' parameter should now be a hashing module")
-
- self.bytes = numbytes
- self.bits = self.bytes*8
- self.entropy = 0
- self._hash = hash
-
- # Construct an array to hold the random pool,
- # initializing it to 0.
- self._randpool = array.array('B', [0]*self.bytes)
-
- self._event1 = self._event2 = 0
- self._addPos = 0
- self._getPos = hash.digest_size
- self._lastcounter=time.time()
- self.__counter = 0
-
- self._measureTickSize() # Estimate timer resolution
- self._randomize()
-
- def _updateEntropyEstimate(self, nbits):
- self.entropy += nbits
- if self.entropy < 0:
- self.entropy = 0
- elif self.entropy > self.bits:
- self.entropy = self.bits
-
- def _randomize(self, N = 0, devname = '/dev/urandom'):
- """_randomize(N, DEVNAME:device-filepath)
- collects N bits of randomness from some entropy source (e.g.,
- /dev/urandom on Unixes that have it, Windows CryptoAPI
- CryptGenRandom, etc)
- DEVNAME is optional, defaults to /dev/urandom. You can change it
- to /dev/random if you want to block till you get enough
- entropy.
- """
- data = ''
- if N <= 0:
- nbytes = int((self.bits - self.entropy)/8+0.5)
- else:
- nbytes = int(N/8+0.5)
- if winrandom:
- # Windows CryptGenRandom provides random data.
- data = winrandom.new().get_bytes(nbytes)
- elif os.path.exists(devname):
- # Many OSes support a /dev/urandom device
- try:
- f=open(devname)
- data=f.read(nbytes)
- f.close()
- except IOError, (num, msg):
- if num!=2: raise IOError, (num, msg)
- # If the file wasn't found, ignore the error
- if data:
- self._addBytes(data)
- # Entropy estimate: The number of bits of
- # data obtained from the random source.
- self._updateEntropyEstimate(8*len(data))
- self.stir_n() # Wash the random pool
-
- def randomize(self, N=0):
- """randomize(N:int)
- use the class entropy source to get some entropy data.
- This is overridden by KeyboardRandomize().
- """
- return self._randomize(N)
-
- def stir_n(self, N = STIRNUM):
- """stir_n(N)
- stirs the random pool N times
- """
- for i in xrange(N):
- self.stir()
-
- def stir (self, s = ''):
- """stir(s:string)
- Mix up the randomness pool. This will call add_event() twice,
- but out of paranoia the entropy attribute will not be
- increased. The optional 's' parameter is a string that will
- be hashed with the randomness pool.
- """
-
- entropy=self.entropy # Save inital entropy value
- self.add_event()
-
- # Loop over the randomness pool: hash its contents
- # along with a counter, and add the resulting digest
- # back into the pool.
- for i in range(self.bytes / self._hash.digest_size):
- h = self._hash.new(self._randpool)
- h.update(str(self.__counter) + str(i) + str(self._addPos) + s)
- self._addBytes( h.digest() )
- self.__counter = (self.__counter + 1) & 0xFFFFffffL
-
- self._addPos, self._getPos = 0, self._hash.digest_size
- self.add_event()
-
- # Restore the old value of the entropy.
- self.entropy=entropy
-
-
- def get_bytes (self, N):
- """get_bytes(N:int) : string
- Return N bytes of random data.
- """
-
- s=''
- i, pool = self._getPos, self._randpool
- h=self._hash.new()
- dsize = self._hash.digest_size
- num = N
- while num > 0:
- h.update( self._randpool[i:i+dsize] )
- s = s + h.digest()
- num = num - dsize
- i = (i + dsize) % self.bytes
- if i<dsize:
- self.stir()
- i=self._getPos
-
- self._getPos = i
- self._updateEntropyEstimate(- 8*N)
- return s[:N]
-
-
- def add_event(self, s=''):
- """add_event(s:string)
- Add an event to the random pool. The current time is stored
- between calls and used to estimate the entropy. The optional
- 's' parameter is a string that will also be XORed into the pool.
- Returns the estimated number of additional bits of entropy gain.
- """
- event = time.time()*1000
- delta = self._noise()
- s = (s + long_to_bytes(event) +
- 4*chr(0xaa) + long_to_bytes(delta) )
- self._addBytes(s)
- if event==self._event1 and event==self._event2:
- # If events are coming too closely together, assume there's
- # no effective entropy being added.
- bits=0
- else:
- # Count the number of bits in delta, and assume that's the entropy.
- bits=0
- while delta:
- delta, bits = delta>>1, bits+1
- if bits>8: bits=8
-
- self._event1, self._event2 = event, self._event1
-
- self._updateEntropyEstimate(bits)
- return bits
-
- # Private functions
- def _noise(self):
- # Adds a bit of noise to the random pool, by adding in the
- # current time and CPU usage of this process.
- # The difference from the previous call to _noise() is taken
- # in an effort to estimate the entropy.
- t=time.time()
- delta = (t - self._lastcounter)/self._ticksize*1e6
- self._lastcounter = t
- self._addBytes(long_to_bytes(long(1000*time.time())))
- self._addBytes(long_to_bytes(long(1000*time.clock())))
- self._addBytes(long_to_bytes(long(1000*time.time())))
- self._addBytes(long_to_bytes(long(delta)))
-
- # Reduce delta to a maximum of 8 bits so we don't add too much
- # entropy as a result of this call.
- delta=delta % 0xff
- return int(delta)
-
-
- def _measureTickSize(self):
- # _measureTickSize() tries to estimate a rough average of the
- # resolution of time that you can see from Python. It does
- # this by measuring the time 100 times, computing the delay
- # between measurements, and taking the median of the resulting
- # list. (We also hash all the times and add them to the pool)
- interval = [None] * 100
- h = self._hash.new(`(id(self),id(interval))`)
-
- # Compute 100 differences
- t=time.time()
- h.update(`t`)
- i = 0
- j = 0
- while i < 100:
- t2=time.time()
- h.update(`(i,j,t2)`)
- j += 1
- delta=int((t2-t)*1e6)
- if delta:
- interval[i] = delta
- i += 1
- t=t2
-
- # Take the median of the array of intervals
- interval.sort()
- self._ticksize=interval[len(interval)/2]
- h.update(`(interval,self._ticksize)`)
- # mix in the measurement times and wash the random pool
- self.stir(h.digest())
-
- def _addBytes(self, s):
- "XOR the contents of the string S into the random pool"
- i, pool = self._addPos, self._randpool
- for j in range(0, len(s)):
- pool[i]=pool[i] ^ ord(s[j])
- i=(i+1) % self.bytes
- self._addPos = i
-
- # Deprecated method names: remove in PCT 2.1 or later.
- def getBytes(self, N):
- warnings.warn("getBytes() method replaced by get_bytes()",
- DeprecationWarning)
- return self.get_bytes(N)
-
- def addEvent (self, event, s=""):
- warnings.warn("addEvent() method replaced by add_event()",
- DeprecationWarning)
- return self.add_event(s + str(event))
-
- class PersistentRandomPool (RandomPool):
- def __init__ (self, filename=None, *args, **kwargs):
- RandomPool.__init__(self, *args, **kwargs)
- self.filename = filename
- if filename:
- try:
- # the time taken to open and read the file might have
- # a little disk variability, modulo disk/kernel caching...
- f=open(filename, 'rb')
- self.add_event()
- data = f.read()
- self.add_event()
- # mix in the data from the file and wash the random pool
- self.stir(data)
- f.close()
- except IOError:
- # Oh, well; the file doesn't exist or is unreadable, so
- # we'll just ignore it.
- pass
-
- def save(self):
- if self.filename == "":
- raise ValueError, "No filename set for this object"
- # wash the random pool before save, provides some forward secrecy for
- # old values of the pool.
- self.stir_n()
- f=open(self.filename, 'wb')
- self.add_event()
- f.write(self._randpool.tostring())
- f.close()
- self.add_event()
- # wash the pool again, provide some protection for future values
- self.stir()
-
- # non-echoing Windows keyboard entry
- _kb = 0
- if not _kb:
- try:
- import msvcrt
- class KeyboardEntry:
- def getch(self):
- c = msvcrt.getch()
- if c in ('\000', '\xe0'):
- # function key
- c += msvcrt.getch()
- return c
- def close(self, delay = 0):
- if delay:
- time.sleep(delay)
- while msvcrt.kbhit():
- msvcrt.getch()
- _kb = 1
- except:
- pass
-
- # non-echoing Posix keyboard entry
- if not _kb:
- try:
- import termios
- class KeyboardEntry:
- def __init__(self, fd = 0):
- self._fd = fd
- self._old = termios.tcgetattr(fd)
- new = termios.tcgetattr(fd)
- new[3]=new[3] & ~termios.ICANON & ~termios.ECHO
- termios.tcsetattr(fd, termios.TCSANOW, new)
- def getch(self):
- termios.tcflush(0, termios.TCIFLUSH) # XXX Leave this in?
- return os.read(self._fd, 1)
- def close(self, delay = 0):
- if delay:
- time.sleep(delay)
- termios.tcflush(self._fd, termios.TCIFLUSH)
- termios.tcsetattr(self._fd, termios.TCSAFLUSH, self._old)
- _kb = 1
- except:
- pass
-
- class KeyboardRandomPool (PersistentRandomPool):
- def __init__(self, *args, **kwargs):
- PersistentRandomPool.__init__(self, *args, **kwargs)
-
- def randomize(self, N = 0):
- "Adds N bits of entropy to random pool. If N is 0, fill up pool."
- import os, string, time
- if N <= 0:
- bits = self.bits - self.entropy
- else:
- bits = N*8
- if bits == 0:
- return
- print bits,'bits of entropy are now required. Please type on the keyboard'
- print 'until enough randomness has been accumulated.'
- kb = KeyboardEntry()
- s='' # We'll save the characters typed and add them to the pool.
- hash = self._hash
- e = 0
- try:
- while e < bits:
- temp=str(bits-e).rjust(6)
- os.write(1, temp)
- s=s+kb.getch()
- e += self.add_event(s)
- os.write(1, 6*chr(8))
- self.add_event(s+hash.new(s).digest() )
- finally:
- kb.close()
- print '\n\007 Enough. Please wait a moment.\n'
- self.stir_n() # wash the random pool.
- kb.close(4)
-
- if __name__ == '__main__':
- pool = RandomPool()
- print 'random pool entropy', pool.entropy, 'bits'
- pool.add_event('something')
- print `pool.get_bytes(100)`
- import tempfile, os
- fname = tempfile.mktemp()
- pool = KeyboardRandomPool(filename=fname)
- print 'keyboard random pool entropy', pool.entropy, 'bits'
- pool.randomize()
- print 'keyboard random pool entropy', pool.entropy, 'bits'
- pool.randomize(128)
- pool.save()
- saved = open(fname, 'rb').read()
- print 'saved', `saved`
- print 'pool ', `pool._randpool.tostring()`
- newpool = PersistentRandomPool(fname)
- print 'persistent random pool entropy', pool.entropy, 'bits'
- os.remove(fname)
-